package com.clickhouse.examples.formats;

import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseConfig;
import com.clickhouse.client.ClickHouseCredentials;
import com.clickhouse.client.ClickHouseException;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseNodeSelector;
import com.clickhouse.client.ClickHouseProtocol;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.ClickHouseResponse;
import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.client.http.config.ClickHouseHttpOption;
import com.clickhouse.config.ClickHouseOption;
import com.clickhouse.data.ClickHouseFormat;
import com.clickhouse.data.ClickHouseInputStream;
import com.clickhouse.examples.protos.UIEvent;
import com.google.protobuf.Message;


import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;


public class ProtobufMain {

    private static final Logger log = Logger.getLogger(ProtobufMain.class.getName());

    public void write(List<Message> messages) {
        try (ClickHouseClient client = getClient()) {
            ClickHouseRequest.Mutation mutation = client.write(getServer());
            mutation.table("default.ui_events");
            mutation.format(ClickHouseFormat.Protobuf);
            ClickHouseResponse response = mutation.data((out) -> {
                for (Message message : messages) {
                    // it is important to write the size of the message before the message itself
                    out.writeVarInt(message.getSerializedSize());
                    message.writeTo(out);
                }
            }).executeAndWait();


            log.info("Response: " + response.getSummary());
        } catch (Exception e) {
            log.log(Level.SEVERE, "Failed to write data", e);
            throw new RuntimeException(e);
        }
    }

    public List<Message> read() {
        try (ClickHouseClient client = getClient()) {
            ClickHouseRequest request = client.read(getServer());

            request.table("default.ui_events");
            request.format(ClickHouseFormat.Protobuf);
            request.set("format_protobuf_use_autogenerated_schema", 1); // use the schema generated by ClickHouse

            ClickHouseResponse response = request.executeAndWait();
            List<Message> messages = new ArrayList<>();
            ClickHouseInputStream inputStream = response.getInputStream();
            while (inputStream.available() > 0) {
                messages.add(UIEvent.parseDelimitedFrom(inputStream));
            }

            return messages;
        } catch (ClickHouseException e) {
            log.log(Level.SEVERE, "Failed to read data from server: " + getClient().getConfig(), e);
            throw new RuntimeException(e);
        } catch (Exception e) {
            log.log(Level.SEVERE, "Failed to write data", e);
            throw new RuntimeException(e);
        }
    }


    public static void main(String[] args) {

        List<Message> messages = new ArrayList<>();

        messages.add(UIEvent.newBuilder()
                .setUrl("http://example.com")
                .setUserId("user1")
                .setSessionId("session1")
                .setTimestamp(System.currentTimeMillis())
                .setEvent("visit")
                .build());

        messages.add(UIEvent.newBuilder()
                .setUrl("http://example.com")
                .setUserId("user1")
                .setSessionId("session1")
                .setTimestamp(System.currentTimeMillis())
                .setDuration(1000)
                .setEvent("leave")
                .build());


        ProtobufMain writer = new ProtobufMain();
        writer.write(messages);

        List<Message> storedMessages = writer.read();
        for (Message message : storedMessages) {
            log.info("Message: " + message);
        }
    }

    protected ClickHouseClient getClient() {
        Map<ClickHouseOption, Serializable> map = new HashMap<>();
        map.put(ClickHouseHttpOption.CUSTOM_PARAMS, "format_protobuf_use_autogenerated_schema=1");
        return ClickHouseClient.builder().config(new ClickHouseConfig(map))
                .nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
                .build();
    }

    protected ClickHouseNode getServer() {
        return ClickHouseNode.builder()
                .host(System.getProperty("chHost", "localhost"))
                .addOption(ClickHouseClientOption.SSL.getKey(), String.valueOf(Boolean.getBoolean("chSsl")))
                .port(ClickHouseProtocol.HTTP, Integer.getInteger("chPort", 8123))
                .credentials(ClickHouseCredentials.fromUserAndPassword(
                        System.getProperty("chUser", "default"), System.getProperty("chPassword", "")))
                .build();
    }
}
